from celery import Celery
from celery.schedules import crontab

app = Celery("periodic", broker="redis://127.0.0.1:6379/0", backend="redis://127.0.0.1:6379/0")


@app.task
def test(arg):
    print(arg)


@app.on_after_configure.connect
def setup_periodic_taks(sender, **kwargs):
    # Calls test('hello') every 10 seconds
    sender.add_periodic_task(10.0, test.s("hello"), name="add every 10")

    # Calls test('world') every 30 seconds
    sender.add_periodic_task(30, test.s("world"), expires=10)

    # Executes every Monday morning at 7:30 a.m
    sender.add_periodic_task(
        crontab(hour=7, minute=30, day_of_week=1),
        test.s("Happy Mondays!")
    )


# 像配置文件一样，进行定时任务
app.conf.beat_schedule = {
    "add-every-5-second": {
        'task': "periodic.test", # 与初始化celery时的第一个参数保持一致 . 任务，名称最好与文件名一致
        "schedule": 5,
        'args': ("wolaichaoshen",)
    },
}
